#!/usr/bin/python -W ignore::DeprecationWarning
# A BackupPC script to archive a host's files to Amazon S3.
#
# Point $Conf{ArchiveClientCmd} at me.
# Requires python-boto
#
# Usage: BackupPC_archiveHost tarCreatePath splitPath parPath host bkupNum \
#             compPath fileExt splitSize outLoc parFile share
#
# Create secrets.py such that it has:
# accesskey = 'amazon aws access key'
# sharedkey = 'amazon aws shared key'
# gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it'
#
# Copyright (c) 2009-2011 Ryan S. Tucker
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import glob
import hashlib
import os
import socket
import sys
import time

from multiprocessing import Process, Queue, cpu_count
from subprocess import *

from boto.s3.connection import S3Connection
from boto.s3.key import Key
import boto.exception

import logging
import logging.handlers

import secrets

logger = logging.getLogger(__name__)

sysloghandler = logging.handlers.SysLogHandler('/dev/log',
                facility=logging.handlers.SysLogHandler.LOG_DAEMON)
syslogformatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s')
sysloghandler.setFormatter(syslogformatter)
logger.addHandler(sysloghandler)

consolehandler = logging.StreamHandler(sys.stdout)
consoleformatter = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
consolehandler.setFormatter(consoleformatter)
logger.addHandler(consolehandler)

logger.setLevel(logging.DEBUG)

class VerifyError(Exception):
    pass

def is_exe(fpath):
    return os.path.exists(fpath) and os.access(fpath, os.X_OK)

def encrypt_file(filename, key, compress='/bin/cat'):
    compressmap = {'cat': 'none', 'gzip': 'ZLIB', 'bzip2': 'BZIP2'}
    if os.path.basename(compress) in compressmap.keys():
        compress_algo = compressmap[os.path.basename(compress)]
    else:
        compress_algo = 'none'

    cmd =  ['/usr/bin/gpg', '--batch', '--no-tty']
    cmd.extend(['--compress-algo', compress_algo])
    cmd.extend(['--output', '%s.gpg' % filename])
    cmd.extend(['--passphrase-fd', '0'])
    cmd.extend(['--symmetric', filename])

    if is_exe(cmd[0]):
        logger.debug('encrypt_file: encrypting %s (compression: %s)' % (filename, compress_algo))
    else:
        raise RuntimeError('%s is not an executable file!' % cmd[0])

    proc = Popen(cmd, preexec_fn=lambda : os.nice(10), stdin=PIPE, stdout=PIPE)
    proc.communicate(key)

    if os.path.exists(filename + '.gpg'):
        oldfilesize = os.path.getsize(filename)
        newfilesize = os.path.getsize(filename + '.gpg')
        compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100
        logger.debug('encrypt_file: %s %s by %.2f%% (%i -> %i bytes)' % (filename, 'shrunk' if oldfilesize>newfilesize else 'grew', compressed, oldfilesize, newfilesize))
        return filename + '.gpg'
    else:
        raise RuntimeError('output file does not exist: %s.gpg' % filename)

def open_s3(accesskey, sharedkey, host):
    conn = S3Connection(accesskey, sharedkey, is_secure=True)
    mybucketname = (accesskey + '-bkup-' + host).lower()
    try:
        bucket = conn.get_bucket(mybucketname)
    except boto.exception.S3ResponseError:
        logger.info('open_s3: creating new bucket %s' % mybucketname)
        bucket = conn.create_bucket(mybucketname)
    bucket.set_acl('private')
    return bucket

def handle_progress(transmitted, pending):
    logger.debug("send_file: %i of %i bytes transmitted (%.2f%%)", transmitted, pending, (transmitted/float(pending))*100)

def verify_file(bucket, filename):
    "Returns True if the file size and md5sum match, False otherwise"
    basefilename = os.path.basename(filename)
    key = bucket.get_key(basefilename)
    stat = os.stat(filename)
    if key:
        if key.size == stat[6]:
            fp = open(filename)
            local_md5 = hashlib.md5(fp.read())
            fp.close()
            logger.debug('verify_file: %s: local md5 "%s", etag %s', filename, local_md5.hexdigest(), key.etag)
            if '"%s"' % local_md5.hexdigest() == key.etag:
                return True
    return False

def send_file(bucket, filename):
    basefilename = os.path.basename(filename)
    k = Key(bucket)
    k.key = basefilename

    if k.exists():
        if verify_file(bucket, filename):
            logger.warning("send_file: %s already exists and is identical, not overwriting", basefilename)
            return k
        logger.warning("send_file: %s already exists on S3, overwriting", basefilename)

    k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True)

    logger.debug("send_file: %s sent, verifying fidelity", filename)
    if not verify_file(bucket, filename):
        raise VerifyError("verify failed")
    return k

def encryption_worker(in_q, out_q, unlink_q):
    "Encrypts things from the in_q, puts them in the out_q"
    start_time = time.time()
    counter = 0
    for filename, gpgkey, comppath in iter(in_q.get, 'STOP'):
        counter += 1
        cryptstart_time = time.time()
        logger.info("encryption_worker: encrypting %s", filename)
        result = encrypt_file(filename, gpgkey, comppath)
        out_q.put(result)
        unlink_q.put(filename)
        logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time)
    logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time)
    time.sleep(5)   # settle

def sending_worker(in_q, out_q, accesskey, sharedkey, host):
    "Sends things from the in_q using the send_file method"
    start_time = time.time()
    counter = 0
    for filename in iter(in_q.get, 'STOP'):
        sending_start = time.time()
        counter += 1
        retry_count = 0
        max_retries = 10
        done = False

        while retry_count <= max_retries and not done:
            try:
                logger.info("sending_worker: sending %s", filename)
                bucket = open_s3(accesskey, sharedkey, host)
                key = send_file(bucket, filename)
                key.set_acl('private')
                key.close()
                done = True
            except (boto.exception.S3ResponseError, socket.error, VerifyError), e:
                retry_count += 1
                sleeptime = 2**retry_count
                logger.error('sending_worker: exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries)
                time.sleep(sleeptime)

        if not done:
            # trip out
            logger.error('sending_worker: could not upload %s in %i retries', filename, retry_count)
        else:
            size = os.path.getsize(filename)
            sending_seconds = time.time() - sending_start
            bytespersecond = size / sending_seconds
            logger.debug("sending_worker: sent %s in %i seconds at %i bytes/second.", filename, sending_seconds, bytespersecond)
            out_q.put(filename)

    logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
    time.sleep(5)   # settle

def unlink_worker(in_q):
    start_time = time.time()
    counter = 0
    for filename in iter(in_q.get, 'STOP'):
        counter += 1
        logger.debug("unlink_worker: deleting %s", filename)
        try:
            os.unlink(filename)
        except OSError, e:
            logger.warning("unlink_worker: caught exception: %s", e)

    logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
    time.sleep(5)   # settle

if __name__ == '__main__':
    # Read in arguments, verify that they match the BackupPC standard exactly
    if len(sys.argv) != 12:
        sys.stderr.write("Usage: %s tarCreatePath splitPath parPath host bkupNum compPath fileExt splitSize outLoc parFile share\n" % sys.argv[0])
        sys.exit(1)
    else:
        tarCreate   = sys.argv[1]
        splitPath   = sys.argv[2]
        parPath     = sys.argv[3]
        host        = sys.argv[4]
        bkupNum     = int(sys.argv[5])
        compPath    = sys.argv[6]
        fileExt     = sys.argv[7]
        splitSize   = int(sys.argv[8])
        outLoc      = sys.argv[9]
        parfile     = sys.argv[10]
        share       = sys.argv[11]

    for i in [tarCreate, compPath, splitPath, parPath]:
        if i is not '' and not is_exe(i):
            sys.stderr.write('Error: %s is not an executable program\n' % i)
            sys.exit(1)

    beginning = time.time()

    # Create queues for workers
    gpg_queue = Queue()
    send_queue = Queue()
    unlink_queue = Queue()

    queues = {
        'gpg_queue': gpg_queue,
        'send_queue': send_queue,
        'unlink_queue': unlink_queue,
    }

    # Is there already evidence of this having been done before?
    if glob.glob('%s/%s.*.tar.*' % (outLoc, host)):
        logger.warning('main: finishing previous incomplete run')
        somefile = os.path.basename(glob.glob('%s/%s.*.tar.*' % (outLoc, host))[0])
        keyparts = somefile.split('.')
        encrypted = split = tarred = final = False
        if keyparts[-1] == 'gpg':
            keyparts.pop()
        if keyparts[-1] != 'tar' and len(keyparts[-1]) is 2:
            keyparts.pop()
        if keyparts[-1] == 'tar':
            keyparts.pop()

        bkupNum = int(keyparts.pop())

        filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum)
        fileglob = filehead + '*'

        mesg = "Continuing upload for host %s, backup #%i" % (host, bkupNum)
        if splitSize > 0 and is_exe(splitPath):
            mesg += ', split into %i byte chunks' % splitSize
        if secrets.gpgsymmetrickey:
            mesg += ', encrypted with secret key'
        logger.info("main: %s", mesg)
    else:
        mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum)

        tarcmd = [tarCreate, '-t']
        tarcmd.extend(['-h', host])
        tarcmd.extend(['-n', str(bkupNum)])
        tarcmd.extend(['-s', share])
        tarcmd.extend(['.'])

        splitcmd = None
        outfile = '%s/%s.%i.tar' % (outLoc, host, bkupNum)

        if splitSize > 0 and is_exe(splitPath):
            filehead = outfile + '.'
            fileglob = filehead + '*'
            splitcmd = [splitPath, '-b', str(splitSize), '-', filehead]
            mesg += ', split into %i byte chunks' % splitSize
        else:
            fileglob = outfile
            filehead = fileglob + '.'

        if secrets.gpgsymmetrickey:
            mesg += ', encrypted with secret key'

        logger.info("main: %s", mesg)
        logger.debug("main: executing tarcmd: %s > %s", ' '.join(tarcmd), outfile)

        tarfp = open(outfile, 'wb')
        proc = Popen(tarcmd, preexec_fn=lambda : os.nice(10), stdout=tarfp)
        proc.communicate()
        tarfp.close()

        if splitcmd:
            logger.debug("main: executing splitcmd: %s", ' '.join(splitcmd))
            tarfp = open(outfile, 'rb')
            proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp)
            proc.communicate()
            tarfp.close()
            unlink_queue.put(outfile)

    logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum))

    # Pre-run to check for artifacts
    for i in glob.glob(fileglob):
        if not i.endswith('.gpg') and os.path.exists(i + '.gpg'):
            logger.warning("main: orphaned GPG file being deleted: %s", i + '.gpg')
            os.unlink(i + '.gpg')

    # Run again to send files to the relevant queue
    for i in sorted(glob.glob(fileglob)):
        if (secrets.gpgsymmetrickey
                and not i.endswith('.gpg')
                and not i.endswith('.COMPLETE')):
            # A tar file, unencrypted, needs encrypted.
            logger.debug("main: adding %s to gpg_queue", i)
            gpg_queue.put([i, secrets.gpgsymmetrickey, compPath])
        else:
            # either encryption is off, or the file is already encrypted
            logger.debug("main: adding %s to send_queue", i)
            send_queue.put(i)

    # Start some handlers, wait until everything is done
    try:
        process_count = cpu_count()
    except NotImplementedError:
        process_count = 1

    procs = []

    for i in range(process_count):
        p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue, unlink_queue))
        p.start()
        procs.append(p)

    send_p = Process(name="send_worker", target=sending_worker, args=(send_queue, unlink_queue, secrets.accesskey, secrets.sharedkey, host))
    send_p.start()
    procs.append(send_p)

    unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue,))
    unlink_p.start()
    procs.append(unlink_p)

    send_queue_closed = False
    unlink_queue_closed = False

    # Put STOP command(s) at the end of the GPG queue.
    gpg_queue_closed = True
    for i in range(process_count):
        gpg_queue.put('STOP')

    for i in procs:
        # wait for each process to terminate in turn
        i.join()
        logger.debug("main: process terminated: %s", i.name)

        # count how many crypto processes are still running
        crypto_running = 0
        for j in procs:
            if j.name.startswith("encryption_worker") and j.is_alive():
                crypto_running += 1

        if crypto_running == 0 and not send_queue_closed:
            # crypto is done, close up the send queue
            logger.debug("main: queuing final file")
            finalfile = '%sCOMPLETE' % filehead
            fp = open(finalfile, 'w')
            fp.write('%s %s "%s"' % (beginning, time.time(), mesg))
            fp.close()
            send_queue.put(finalfile)

            logger.debug("main: queuing stop sentinel for send_queue")
            send_queue.put('STOP')
            send_queue_closed = True

        if not send_p.is_alive() and not unlink_queue_closed:
            # sending is done, close up the unlink queue
            logger.debug("main: queuing stop sentinel for unlink_queue")
            unlink_queue.put('STOP')
            unlink_queue_closed = True

    for qname, q in queues.items():
        time.sleep(5)  # settle
        if not q.empty():
            logger.critical("main: queue %s not empty!", qname)
            raise Exception("queue not empty: %s" % qname)
        else:
            logger.debug("main: queue %s is empty", qname)

    logger.info("main: completed run after %i seconds", (time.time() - beginning))
